package com.atguigu.flink.chapter11.function;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

import javax.xml.transform.Result;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class Flink04_Table_Aggregate {
       public static void main(String[] args) {
               Configuration configuration = new Configuration();
               //web  UI端口
               configuration.setInteger("rest.prot",10000);
               StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
               env.setParallelism(2);

           DataStreamSource<WaterSensor> waterSensorStream =
                   env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                           new WaterSensor("sensor_1", 2000L, 20),
                           new WaterSensor("sensor_2", 3000L, 30),
                           new WaterSensor("sensor_1", 4000L, 40),
                           new WaterSensor("sensor_1", 5000L, 50),
                           new WaterSensor("sensor_2", 6000L, 60));

           // 1. 创建表的执行环境
           StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
           Table table = tableEnv.fromDataStream(waterSensorStream);
           tableEnv.createTemporaryView("sensor",table);

           //1.在table API 中使用
           // 1.1 内联的方式使用
           table
                   .groupBy($("id"))
                   .flatAggregate(call(Top2.class,$("vc")))
                   .select($("id"),$("rank"),$("value"))
                   .execute()
                   .print();

           // 1.2 函数先注册，再使用

           // 2. 无法  在sql 中使用 既有聚合又有制表结构


           }
           public static class Top2 extends TableAggregateFunction<Result,FirstSecond>{

               // 创建累加器
               @Override
               public FirstSecond createAccumulator() {
                   return new FirstSecond();
               }

               // 聚合函数

               public void accumulate(FirstSecond acc,Integer vc){
                   if (vc> acc.first) {
                       acc.second  = acc.first;
                       acc.first = vc;
                   }else if (vc > acc.second){
                       acc.second =vc;
                   }

               }
               // 发射函数
               // 第一个参数必须是累加器  就是输出每行元素
               public void emitValue(FirstSecond acc , Collector<Result> out){

                   out.collect(new Result("第一：",acc.first));
                   if (acc.second > 0) {
                       out.collect(new Result("第二", acc.second));
                   }
               }


           }


    public static class Result{
               public String rank;

                   public Result(String rank, Integer value) {
                       this.rank = rank;
                       this.value = value;
                   }

                   public Integer value;
           }
           public static class FirstSecond{
               public Integer first =0;
               public Integer second =0;
           }


}
/*
*   每来一条数据，输出水位的top2
*
*
*
* */
